package com.springchang.easyelasticsearch.core;

import com.alibaba.fastjson.JSON;
import com.springchang.easyelasticsearch.annotation.EsField;
import com.springchang.easyelasticsearch.annotation.EsIndex;
import com.springchang.easyelasticsearch.annotation.EsLogicId;
import com.springchang.easyelasticsearch.exception.IllegalFieldException;
import com.springchang.easyelasticsearch.exception.IndexExistException;
import com.springchang.easyelasticsearch.valuable.EsTypes;
import com.springchang.easyelasticsearch.valuable.Paramater;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.ScoreSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.ParameterizedType;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @author : 张翠山
 */
public abstract class RestHighLevelClientAbs {

    protected static final Logger log = LoggerFactory.getLogger(RestHighLevelClientAbs.class);

    protected RestHighLevelClient restHighLevelClient;

    public RestHighLevelClient getRestHighLevelClient() {
        return restHighLevelClient;
    }

    public void setRestHighLevelClient(RestHighLevelClient restHighLevelClient) {
        this.restHighLevelClient = restHighLevelClient;
    }

    protected List<Map<String, Object>> query(Selector selector) throws IllegalArgumentException, ElasticsearchStatusException, IOException {
        if(selector == null || selector.getIndex() == null || "".equals(selector.getIndex())) {
            throw new IllegalArgumentException("查询包装类为空或索引为空");
        }
        List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
        BoolQueryBuilder boolQueryBuilder = selector.getBoolQueryBuilder();
        int from = selector.getFrom();
        int size = selector.getSize();

        try {
            // 1、创建search请求
            SearchRequest searchRequest = new SearchRequest(selector.getIndex());
            searchRequest.types("_doc");

            // 2、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法，构造各种查询的方法都在这。
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

            sourceBuilder.query(boolQueryBuilder);
            sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

            //分页
            from = from <= -1 ? 0 : from;
            size = size <= 0 ? 20 : size;
            sourceBuilder.from(from);
            sourceBuilder.size(size);

            //排序
            if (!CollectionUtils.isEmpty(selector.getSortFieldsToAsc())) {
                selector.getSortFieldsToAsc().forEach((k, v) -> {
                    sourceBuilder.sort(new FieldSortBuilder(k).order(v ? SortOrder.ASC : SortOrder.DESC));
                });
            } else {
                sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
            }

            //返回和排除列
            if (selector.getIncludeFields() != null && selector.getIncludeFields().length > 0 && selector.getExcludeFields() != null && selector.getExcludeFields().length > 0) {
                sourceBuilder.fetchSource(selector.getIncludeFields(), selector.getExcludeFields());
            }

            searchRequest.source(sourceBuilder);

            SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

            //获取source
            list = Arrays.stream(searchResponse.getHits().getHits()).map(b -> {
                return b.getSourceAsMap();
            }).collect(Collectors.toList());

        } catch (ElasticsearchStatusException elasticsearchStatusException) {
            throw elasticsearchStatusException;
        } catch (IOException ioException) {
            throw ioException;
        }

        if(CollectionUtils.isEmpty(list)) {
            return Collections.EMPTY_LIST;
        }
        return list;
    }

    public long count(Selector selector) throws IllegalArgumentException, ElasticsearchStatusException {
        if(selector == null || selector.getIndex() == null || "".equals(selector.getIndex())) {
            throw new IllegalArgumentException("查询包装类为空或索引为空");
        }

        // 通过CountRequest查询获得count
        CountRequest countRequest = new CountRequest(new String[]{selector.getIndex()}, selector.getBoolQueryBuilder());

        try {
            CountResponse response = restHighLevelClient.count(countRequest, RequestOptions.DEFAULT);
            return response.getCount();
        } catch (Exception e) {
            log.error("exception: {}", e.getMessage(), e);
            return 0;
        }
    }

    /**
     *  新增，修改文档
     * @param indexName  索引
     * @param jsonStr 文档数据
     */
    protected void addData(String indexName, String jsonStr) {
        try {
            // 1、创建索引请求  //索引  // mapping type  //文档id
            IndexRequest request = new IndexRequest(indexName);

            request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

            // 2、准备文档数据
            // 直接给JSON串
            request.source(jsonStr, XContentType.JSON);
            //4、发送请求
            IndexResponse indexResponse = null;
            try {
                // 同步方式
                indexResponse = restHighLevelClient.index(request, RequestOptions.DEFAULT);
            } catch (ElasticsearchException e) {
                log.error("新增数据出错: {}", e.getMessage(), e);
                throw new ElasticsearchStatusException("向 ElasticSearch 新增数据失败!", e.status());
            }

            //5、处理响应
            if (indexResponse != null) {
                String responseIndex = indexResponse.getIndex();
                String indexResponseId = indexResponse.getId();
                long responseVersion = indexResponse.getVersion();
                if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
                    log.info("新增文档: [index = {}, id = {}, version = {} ]", responseIndex, indexResponseId, responseVersion);
                } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
                    log.info("修改文档成功");
                }

                // 分片处理信息
                ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
                if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
                    log.debug("分片处理信息...");
                }
                // 如果有分片副本失败，可以获得失败原因信息
                if (shardInfo.getFailed() > 0) {
                    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                        log.error("副本失败原因: {}", failure.reason());
                    }
                }
            }

        } catch (Exception e) {
            throw new RuntimeException(e.getMessage());
        }
    }

    protected  <T> boolean update(String index, String idName, Object idValue, T entity) throws NullPointerException, IOException {
        if (StringUtils.isEmpty(index)) {
            throw new NullPointerException("索引[" + index + "]为空");
        }
        if (StringUtils.isEmpty(idName)) {
            throw new NullPointerException("[主键名" + idName + "]为空");
        }
        if (StringUtils.isEmpty(idValue)) {
            throw new NullPointerException("[主键值" + idValue + "]为空");
        }

        SearchRequest searchRequest = new SearchRequest(index);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(QueryBuilders.termQuery(idName, idValue));
        searchSourceBuilder.size(2);
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        SearchHit[] searchHits = searchResponse.getHits().getHits();
        for (SearchHit s : searchHits) {
            String docId = s.getId();
            UpdateRequest request = new UpdateRequest(index,"_doc", docId);
            request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
            String str = JSON.toJSONString(entity);
            request.doc(str, XContentType.JSON);
            request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
            request.setRefreshPolicy("true");
            UpdateResponse resp = restHighLevelClient.update(request, RequestOptions.DEFAULT);
        }

        return true;
    }

    // TODO 逻辑删除字段应该从注解扫描出来才对
    protected  <T> boolean logicDelete (String index, String idName, Object idValue) throws NullPointerException, IOException {
        if (StringUtils.isEmpty(index)) {
            throw new NullPointerException("索引[" + index + "]为空");
        }
        if (StringUtils.isEmpty(idName)) {
            throw new NullPointerException("[主键名" + idName + "]为空");
        }
        if (StringUtils.isEmpty(idValue)) {
            throw new NullPointerException("[主键值" + idValue + "]为空");
        }

        SearchRequest searchRequest = new SearchRequest(index);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(QueryBuilders.termQuery(idName, idValue));
        searchSourceBuilder.size(2);
        searchRequest.source(searchSourceBuilder);

        Map<String, Boolean> entity = new Hashtable<String, Boolean>();
        entity.put("deleted", true);

        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        SearchHit[] searchHits = searchResponse.getHits().getHits();
        for (SearchHit s : searchHits) {
            String docId = s.getId();
            UpdateRequest request = new UpdateRequest(index,"_doc", docId);
            request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
            request.doc(JSON.toJSONString(entity), XContentType.JSON);
            request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
            request.setRefreshPolicy("true");
            UpdateResponse resp = restHighLevelClient.update(request, RequestOptions.DEFAULT);
        }
        return true;
    }

    /**
     *  批量插入ES
     * @param indexName 索引
     * @param list 数据集合
     */
    protected void bulkDate(String indexName, List<Map<String,Object>> list ) throws IllegalArgumentException {
        try {

            if(null == list || list.size()<=0){
                throw new IllegalArgumentException("新增数组长度为零或者为空");
            }
            if(StringUtils.isEmpty(indexName)){
                throw new IllegalArgumentException("指定的文档索引为空");
            }
            BulkRequest request = new BulkRequest();
            for(Map<String,Object> map : list){
                request.add(new IndexRequest(indexName).source(map, XContentType.JSON));
            }

            // 2、可选的设置
           request.timeout("1m");
           request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
           request.waitForActiveShards(2);

            //3、发送请求
            // 同步请求
            BulkResponse bulkResponse = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);

            //4、处理响应
            if(bulkResponse != null) {
                for (BulkItemResponse bulkItemResponse : bulkResponse) {
                    DocWriteResponse itemResponse = bulkItemResponse.getResponse();

                    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
                            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
                        IndexResponse indexResponse = (IndexResponse) itemResponse;
                        //TODO 新增成功的处理
                        System.out.println("新增成功,{}"+ indexResponse.toString());
                    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
                        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
                        //TODO 修改成功的处理
                        System.out.println("修改成功,{}"+ updateResponse.toString());
                    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
                        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                        //TODO 删除成功的处理
                        System.out.println("删除成功,{}"+ deleteResponse.toString());
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 解析泛型类，拿到es查询的结果
     * @param entity
     * @param <T>
     * @return
     */
    protected <T> Paramater parseGeneric(T entity) throws NullPointerException, IllegalArgumentException, IllegalAccessException {
        Paramater paramater = new Paramater();

        if(entity == null) {
            throw new NullPointerException("传入的对象为空");
        }

        //通过反射获取@Id注解下的table名字
        Class clazz = entity.getClass();
        Annotation[] annotations = clazz.getAnnotations();

        EsIndex tableAnnotation = null;
        for (Annotation annotation : annotations) {
            if(annotation instanceof EsIndex) {
                tableAnnotation = (EsIndex) annotation;
                break;
            }
        }
        if(tableAnnotation == null) {
            throw new IllegalArgumentException(clazz.getSimpleName()+"类定义缺少@Table注解");
        }
        if(StringUtils.isEmpty(tableAnnotation.name())) {
            throw new IllegalArgumentException(clazz.getSimpleName()+"类定义@Table注解name()属性值为空");
        }

        String indexName = tableAnnotation.name();

        //判断业务主键是否为空
        Field[] fields = clazz.getDeclaredFields();
        int idAnnotationCount = 0;
        String primaryKeyName = "";
        Object idValue = null;
        for (Field field : fields) {
            Annotation[] fieldAnnotations = field.getAnnotations();
            for (Annotation fieldAnnotation : fieldAnnotations) {
                if(fieldAnnotation instanceof EsLogicId) {
                    idAnnotationCount++;
                    EsLogicId idAnnotation = field.getAnnotation(EsLogicId.class);
                    primaryKeyName = idAnnotation.name();
                    field.setAccessible(true);
                    idValue = field.get(entity);
                    field.setAccessible(false);
                }
            }
        }

        if(idAnnotationCount != 1) {
            throw new IllegalStateException(clazz.getSimpleName()+"类@Id注解为空或者有多个");
        }
        if(StringUtils.isEmpty(primaryKeyName)) {
            throw new IllegalStateException(clazz.getSimpleName()+"类@Id注解name()值为空");
        }

        //组装要返回的数据
        paramater.setIndex(indexName);
        paramater.setIdName(primaryKeyName);
        paramater.setIdValue(idValue);

        return paramater;
    }

    protected boolean isIndexExist(String index) throws IOException {
        GetIndexRequest indexRequest = new GetIndexRequest().indices(new String[]{index});
        return restHighLevelClient.indices().exists(indexRequest, RequestOptions.DEFAULT);
    }

    protected boolean creasteIndex(String indexName, Class clazz) throws IndexExistException, IllegalFieldException, IOException {

        CreateIndexRequest request = new CreateIndexRequest(indexName);

        request.settings(Settings.builder()
                .put("index.number_of_shards", 1)
                .put("index.number_of_replicas", 1)
        );

        //解析Java Bean的所有注解
        Map<String, Object> properties = new HashMap<>();
        //获取Java Bean的所有字段
        Field[] fields = clazz.getDeclaredFields();

        //校验数据类型的合法性
        Set<String> esFieldTypeSet = new HashSet<>();
        esFieldTypeSet.add("boolean");
        esFieldTypeSet.add("long");
        esFieldTypeSet.add("text");
        esFieldTypeSet.add("keyword");
        esFieldTypeSet.add("date");
        esFieldTypeSet.add("half_float");
        esFieldTypeSet.add("double");
        esFieldTypeSet.add("nested");

        //简单处理二级嵌套
        for (Field field : fields) {
            Annotation[] fieldAnnotations = field.getAnnotations();
            for (Annotation fieldAnnotation : fieldAnnotations) {
                if(fieldAnnotation instanceof EsField) {
                    EsField column = (EsField) fieldAnnotation;
                    if(!EsTypes.NESTED.equals(column.type())) {
                        //非nested字段
                        Map<String, Object> map = new HashMap<>();
                        if (!esFieldTypeSet.contains(column.type())) {
                            throw new IllegalFieldException("字段[" + column.column_name() + "]上的注解Coulmn, type值有非法的字段类型[" + column.type() + "]");
                        }
                        map.put("type", column.type());
                        properties.put(column.column_name(), map);
                    } else {
                        //nested字段
                        Map<String, Object> map = new HashMap<>();
                        map.put("type", column.type());
                        Map<String, Object> subProperties = new HashMap<>();

                        Class subClazz = null;
                        try {
                            subClazz = Class.forName(field.getType().getName());
                            if(subClazz.isAssignableFrom(Collection.class) || Collection.class.isAssignableFrom(subClazz)) { //集合数据类型
                                java.lang.reflect.Type gt = field.getGenericType(); //得到泛型
                                ParameterizedType pt = (ParameterizedType) gt;
                                Class<?> colClazz = (Class<?>) pt.getActualTypeArguments()[0];
                                String clazzName = colClazz.getName();
                                if(clazzName != null && !"".equals(clazzName)) {
                                    subClazz = Class.forName(clazzName);
                                }
                            }
                        } catch (ClassNotFoundException classNotFoundException) {
                            continue;
                        }
                        Field[] subFields = subClazz.getDeclaredFields();
                        for (Field subField : subFields) {
                            log.info(subField.toString());
                            Annotation[] subFieldAnnotations = subField.getAnnotations();
                            for (Annotation subFieldAnnotation : subFieldAnnotations) {
                                if (subFieldAnnotation instanceof  EsField) {
                                    EsField subColumn = (EsField) subFieldAnnotation;
                                    Map<String, Object> subMap = new HashMap<>();
                                    if(!esFieldTypeSet.contains(subColumn.type())) {
                                        throw new IllegalFieldException("字段[" + subColumn.column_name() + "]上的注解Coulmn, type值有非法的字段类型[" + subColumn.type() + "]");
                                    }
                                    subMap.put("type", subColumn.type());
                                    subProperties.put(subColumn.column_name(), subMap);
                                }
                            }
                        }
                        map.put("properties", subProperties);
                        properties.put(column.column_name(), map);
                    }
                }
            }
        }
        Map<String, Object> mapping = new HashMap<>();
        mapping.put("properties", properties);
        request.mapping(mapping);
        CreateIndexResponse createIndexResponse = null;

        try {
            createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
        } catch (ElasticsearchStatusException elasticsearchStatusException) {
            log.error(elasticsearchStatusException.getMessage());
        }
        boolean acknowledged = createIndexResponse.isAcknowledged();
        boolean shardsAcknowledged = createIndexResponse.isShardsAcknowledged();

        return acknowledged && shardsAcknowledged;
    }
}